package org.zjvis.datascience.web.controller;

import cn.weiguangfu.swagger2.plus.annotation.ApiGroup;
import cn.weiguangfu.swagger2.plus.annotation.ApiPlus;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.zjvis.datascience.common.annotation.PipelineAuth;
import org.zjvis.datascience.common.annotation.ProjectAuth;
import org.zjvis.datascience.common.annotation.ProjectRoleAuth;
import org.zjvis.datascience.common.dto.*;
import org.zjvis.datascience.common.enums.*;
import org.zjvis.datascience.common.exception.*;
import org.zjvis.datascience.common.model.ApiResult;
import org.zjvis.datascience.common.model.ApiResultCode;
import org.zjvis.datascience.common.model.stat.ColumnAction;
import org.zjvis.datascience.common.util.DozerUtil;
import org.zjvis.datascience.common.util.JwtUtil;
import org.zjvis.datascience.common.vo.*;
import org.zjvis.datascience.common.vo.dataset.DatasourceLocateVO;
import org.zjvis.datascience.common.vo.pipeline.TPIdComboVO;
import org.zjvis.datascience.common.vo.project.ProjectNameVO;
import org.zjvis.datascience.common.widget.dto.WidgetDTO;
import org.zjvis.datascience.service.*;
import org.zjvis.datascience.service.dag.DAGScheduler;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid;
import java.util.*;

/**
 * @description 任务节点模型接口 Controller
 * @date 2021-12-29
 */
@ApiPlus(value = true)
@RequestMapping("/task")
@RestController
@Api(tags = "task", description = "任务节点模型接口")
@Validated
public class TaskController {

    private final static Logger logger = LoggerFactory.getLogger(TaskController.class);

    @Autowired
    private TaskService taskService;

    @Autowired
    private DAGScheduler dagScheduler;

    @Autowired
    private FastTextService fastTextService;

    @Autowired
    private PanelService panelService;

    @Autowired
    private JlabService jlabService;

    @Autowired
    private OperatorFavouriteService operatorFavouriteService;

    @Autowired
    private OperatorTemplateService operatorTemplateService;

    @Autowired
    private DataPreviewService dataPreviewService;

    @Autowired
    private WidgetService widgetService;

    @Autowired
    private TColumnService tColumnService;

    @Autowired
    private ActionService actionService;

    @Autowired
    private AutoJoinService autoJoinService;

    @PostMapping(value = "/undo")
    @ResponseBody
    @ApiOperation(value = "撤销", notes = "撤销")
    @Transactional
    public ApiResult<Void> undo(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() JSONObject params) {
        if (!params.containsKey("projectId") || !params.containsKey("pipelineId")
            || params.getLong("projectId") <= 0L || params.getLong("pipelineId") <= 0L) {
            logger.warn("API /task/undo failed, since {}", ApiResultCode.PARAM_ERROR.getMessage());
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        boolean flag = actionService.undo(params.getLong("pipelineId"));
        if (!flag) {
            logger.error("API /task/undo failed, since {}", "撤销失败");
            taskService.clearTaskUtil(params.getLong("pipelineId"));
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "撤销失败");
        }
        taskService.clearTaskUtil(params.getLong("pipelineId"));
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/redo")
    @ResponseBody
    @ApiOperation(value = "重做", notes = "重做")
    @Transactional
    public ApiResult<Void> redo(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() JSONObject params) {
        if (!params.containsKey("projectId") || !params.containsKey("pipelineId")
            || params.getLong("projectId") <= 0L || params.getLong("pipelineId") <= 0L) {
            logger.warn("API /task/redo failed, since {}", ApiResultCode.PARAM_ERROR.getMessage());
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        boolean flag = actionService.redo(params.getLong("pipelineId"));
        if (!flag) {
            taskService.clearTaskUtil(params.getLong("pipelineId"));
            logger.error("API /task/redo failed, since {}", "重做失败，不允许重做");
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "重做失败，不允许重做");
        }
        taskService.clearTaskUtil(params.getLong("pipelineId"));
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/load")
    @Transactional
    @ResponseBody
    @ApiOperation(value = "算子模板加载", notes = "算子模板加载")
    public ApiResult<TaskVO> load(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() TaskVO vo) {
        if (vo.getUserId() == null) {
            vo.setUserId(JwtUtil.getCurrentUserId());
        }

        if (vo.getPipelineId() <= 0L) {
            logger.warn("API /task/load failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        if (vo.getProjectId() <= 0L) {
            logger.warn("API /task/load failed, since {}", "project is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        if (vo.getData() == null || vo.getData().isEmpty()) {
            logger.warn("API /task/load failed, since {}", "data is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        if (vo.getType() == null || vo.getType() <= 0) {
            logger.warn("API /task/load failed, since {}", "type is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        JSONObject json = vo.getData();
        int type = vo.getType();

        if (TaskTypeEnum.TASK_TYPE_DATA.getVal() == type) {
            Long datasetId = json.getLong("dataset_id");
            if (datasetId == null || datasetId <= 0L) {
                logger.warn("API /task/load failed, since {}", "datasetId is empty");
                return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
            }
        }
        if (TaskTypeEnum.TASK_TYPE_JLAB.getVal() == type){
            JlabDTO labExist = jlabService.queryByUserId(vo.getUserId());
            if (labExist == null){
                JlabDTO jlab = new JlabDTO();
                jlab.setUserId(vo.getUserId());
                jlabService.insert(jlab);
                //Long labId = 158L;
                jlabService.exec(vo.getUserId(), jlab.getId(), ExeFlagEnum.INIT_NEW_LAB.getVal());
//                JlabService.load(vo);
            }
            else{
                if (labExist.getActive() == 0){
                    jlabService.start(labExist);
                }
                //mkdir projectName/taskName and getToken
            }
        }
        if (TaskTypeEnum.TASK_TYPE_GRAPH.getVal() == type && vo.getChildId() != null) {
            logger.warn("API /task/load failed, since {}", "网络构建节点无法拥有子节点");
            return ApiResult.valueOf(ApiResultCode.GRAPH_NO_CHILD);
        }

        String parentId = vo.getParentId();
        if (StringUtils.isNotEmpty(parentId)) {
            // 父节点不为空,检测父节点是否为网络构建节点
            String[] parentIds = parentId.split(",");
            for (String pid : parentIds) {
                TaskDTO parentNode = taskService.queryById(Long.parseLong(pid));
                if (TaskTypeEnum.TASK_TYPE_GRAPH.getVal().equals(parentNode.getType())) {
                    logger.warn("API /task/load failed, since {}", "网络构建节点无法拥有子节点");
                    return ApiResult.valueOf(ApiResultCode.GRAPH_NO_CHILD);
                }
            }
        }

        TaskDTO task;
        try {
            task = taskService.load(vo);
            taskService.updateTaskName(task, "add");
            taskService.syncNodeName(task);
        } catch (Exception e) {
            logger.error("API /task/load failed, since {}", e.getMessage());
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
        }
        JSONObject jsonObject = JSONObject.parseObject(task.getDataJson());
        jsonObject.put("forbidden", false);
        task.setDataJson(jsonObject.toJSONString());
        Long id = taskService.save(task);
        task.setId(id);
        taskService.updateTaskNode(task.getPipelineId(), task, "add");
        TaskVO ret = task.view();
        if (request != null) {
            List<TaskDTO> taskArray = new ArrayList<>();
            taskArray.add(task);
            actionService.addActionForTask(ActionEnum.ADD, taskArray, null);
        }
        if (TaskTypeEnum.TASK_TYPE_ALGOPY.getVal() == type && parentId != null) {
            //TODO
            TaskDTO parentNode = taskService.queryById(Long.valueOf(parentId));
            TaskDTO childNode = taskService.dataJsonUpdate(parentNode,task);
            if (JSONObject.parseObject(childNode.getDataJson()).containsKey("warning")){
                ret = childNode.view();
            }
        }
        if (TaskTypeEnum.TASK_TYPE_JLAB.getVal() == type){
            jlabService.load(task.view());
        }
        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/load_panel")
    @ResponseBody
    @ApiOperation(value = "加载视图左侧算子", notes = "加载视图左侧算子")
    public ApiResult<PanelDTO> loadPanel(HttpServletRequest request,
        @RequestBody @ProjectRoleAuth() ProjectNameVO vo)
        throws IllegalAccessException, InstantiationException, ClassNotFoundException {
        vo.setUserId(JwtUtil.getCurrentUserId());
        PanelDTO panelDTO = panelService.loadPanel(vo);
        return ApiResult.valueOf(panelDTO);
    }

    @PostMapping(value = "/save")
    @ResponseBody
    @ApiOperation(value = "新增算子", notes = "新增算子")
    public ApiResult<Long> save(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() TaskVO vo) {
        if (vo.getUserId() == null) {
            vo.setUserId(JwtUtil.getCurrentUserId());
        }

        if (vo.getPipelineId() <= 0L) {
            logger.warn("API /task/save failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        if (vo.getProjectId() <= 0L) {
            logger.warn("API /task/save failed, since {}", "projectId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        if (vo.getData() == null || vo.getData().isEmpty()) {
            logger.warn("API /task/save failed, since {}", "data is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        TaskDTO dto = vo.toTask();
        Long id = taskService.save(dto);
        dto.setId(id);
        taskService.updateTaskName(dto, "add");
        taskService.updateTaskNode(dto.getPipelineId(), dto, "add");
        if (request != null) {
            TaskDTO taskDTO = taskService.queryById(id);
            List<TaskDTO> array = new ArrayList<>();
            array.add(taskDTO);
            actionService.addActionForTask(ActionEnum.ADD, array, null);
        }
        return ApiResult.valueOf(id);
    }

    @PostMapping(value = "/simpleUpdate")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "更新算子", notes = "更新算子")
    public ApiResult<Long> simpleUpdate(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() TaskVO vo) {
        // 该更新只针对节点位置，节点名称的修改操作
        if (vo.getPipelineId() == null || vo.getPipelineId() <= 0L || vo.getId() == null
            || vo.getId() <= 0L) {
            logger.warn("API /task/simpleUpdate failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        if (vo.getUserId() == null) {
            vo.setUserId(JwtUtil.getCurrentUserId());
        }

        TaskDTO oldTask = null;
        TaskDTO newTask = null;
        if (request != null) {
            oldTask = taskService.queryById(vo.getId());
        }
        TaskDTO dto = new TaskDTO();
        boolean updateX = false;
        //TODO 移进simpleUpdate 里去
        //update feature_cols as X for prediction ONLY FOR MODEL NODE
        if (vo.getType() != null && vo.getType() == TaskTypeEnum.TASK_TYPE_MODEL.getVal()) {
            try {
                String X = vo.getData().getString("feature_col");
                if (!X.equals("") && !X.equals(null)) {
                    updateX = true;
                }
            } catch (Exception e) {
                logger.error("API /task/simpleUpdate failed, since {}", e.getMessage());
                updateX = false;
            }
            if (updateX) {
                String X = vo.getData().getString("feature_col");
                TaskDTO task = oldTask;
                JSONObject dataJson = JSONObject.parseObject(task.getDataJson());
                dataJson.put("feature_X", X);
                String dataString = String.valueOf(dataJson);
                task.setDataJson(dataString);
                if (vo.getName() != null && vo.getName() != "") {
                    task.setName(vo.getName());
                }
                String childId = vo.getChildId();
                if (childId != null) {
                    task.setChildId(childId);
                }
                String parentId = vo.getParentId();
                if (parentId != null && parentId != "") {
                    task.setParentId(parentId);
                }
                dto = task;
            }
        }
        if (!updateX) {
            dto = vo.toTask();
        }
        if (vo.getType() != null && vo.getType() == TaskTypeEnum.TASK_TYPE_ALGOPY.getVal() ||
                vo.getType() != null && vo.getType() == TaskTypeEnum.TASK_TYPE_JLAB.getVal() ){
            TaskDTO task = oldTask;
            //todo
            if (task != null){
                JSONObject newformData = vo.getData();
                JSONObject oldDataJson = JSONObject.parseObject(oldTask.getDataJson());
                JSONObject setParams = oldDataJson.getJSONArray("setParams").getJSONObject(0);
                setParams.remove("formData");
                setParams.put("formData",newformData);
                task.setDataJson(oldDataJson.toString());

                String childId = vo.getChildId();
                if (childId != null) {
                    task.setChildId(childId);
                }
                String parentId = vo.getParentId();
                if (parentId != null && parentId != "") {
                    task.setParentId(parentId);
                }
                dto = task;
            }
        }
        if (oldTask != null && vo.getName() != null
                && oldTask.getType() == TaskTypeEnum.TASK_TYPE_JLAB.getVal()
                && !vo.getName().equals(oldTask.getName())){
            jlabService.renameTaskDir(vo, oldTask.getName());
        }

        List<TaskDTO> ctx1 = new ArrayList<>();
        List<TaskDTO> ctx2 = new ArrayList<>();
        List<ApiResultCode> errorCode = new ArrayList<>();
        if (!taskService.simpleUpdate(dto, errorCode, request)) {
            if (dto.getException() != null) {
                logger.error("API /task/simpleUpdate failed, since {}",
                    dto.getException().getMessage());
                if (dto.getException() instanceof UnionStatementParserException) {
                    return new ApiResult(ApiResultCode.UNION_TYPE_UNMATCHED, dto.getException().getMessage());
                } else if (dto.getException() instanceof SqlParserException) {
                    return new ApiResult(ApiResultCode.SQL_NOT_SUPPORT, dto.getException().getMessage());
                } else {
                    return new ApiResult(ApiResultCode.SYS_ERROR, dto.getException().getMessage());
                }
            }
            logger.error("API /task/simpleUpdate failed, since {}", "更新失败");
            if (errorCode.size() > 0){
                return ApiResult.valueOf(errorCode.get(0));
            } else {
                return new ApiResult(ApiResultCode.SYS_ERROR, "参数更新失败，请稍后重试");
            }
        }
        if (dto.getException() != null) {
            if (dto.getException() instanceof AutoTriggerException) {
                //针对 连线中间添加 JOIN/UNION/SQL节点
                return new ApiResult(ApiResultCode.AUTO_TRIGGER_FAILED, dto.getException().getMessage());
            }else {
                return new ApiResult<Long>(ApiResultCode.SYS_ERROR, dto.getException().getMessage());
            }
        }
        taskService.updateTaskName(dto, "update");
        taskService.updateTaskNode(vo.getPipelineId(), dto, "update");
        if (request != null) {
            newTask = taskService.queryById(vo.getId());
            ctx1.add(oldTask);
            ctx2.add(newTask);
            actionService.addActionForTask(ActionEnum.UPDATE, ctx1, ctx2);
        }
        return ApiResult.valueOf(vo.getId());
    }

    @PostMapping(value = "/update")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "更新算子", notes = "更新算子")
    public ApiResult<Long> update(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() TaskVO vo) {
        if (vo.getUserId() == null) {
            vo.setUserId(JwtUtil.getCurrentUserId());
        }

        if (vo.getPipelineId() <= 0L) {
            logger.warn("API /task/update failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        if (vo.getProjectId() <= 0L) {
            logger.warn("API /task/update failed, since {}", "projectId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        TaskDTO oldTask = null;
        TaskDTO newTask = null;
        if (request != null) {
            oldTask = taskService.queryById(vo.getId());
        }
        List<TaskDTO> ctx1 = new ArrayList<>();
        List<TaskDTO> ctx2 = new ArrayList<>();
        TaskDTO dto = vo.toTask();
        List<ApiResultCode> errorCodes = new ArrayList<>();
        boolean retFlag = taskService.preUpdate(dto, errorCodes);
        if (!retFlag) {
            return ApiResult.valueOf(errorCodes.get(0));
        }
//        boolean needParamVerify = taskService.isNeedParamVerify(dto);
//        boolean flag = taskService.verifyParams(dto, errorCodes);
//        //校验通过后，再进行数值配置参数的合法性校验
//        flag = true;
////    flag =
////        flag ? (needParamVerify ? taskService.validateConfigParams(errorCodes, dto) : true) : false;
//        if (!flag) {
//            if (errorCodes.size() != 0) {
//                return ApiResult.valueOf(errorCodes.get(0));
//            } else {
//                return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
//            }
//        }
        taskService.validateTableName(dto);
        taskService.update(dto);
        taskService.updateTaskName(dto, "update");
        taskService.updateTaskNode(dto.getPipelineId(), dto, "update");
        if (request != null) {
            newTask = taskService.queryById(vo.getId());
            ctx1.add(oldTask);
            ctx2.add(newTask);
            actionService.addActionForTask(ActionEnum.UPDATE, ctx1, ctx2);
        }
        return ApiResult.valueOf(vo.getId());
    }

    @PostMapping(value = "/copy")
    @ResponseBody
    @ApiOperation(value = "拷贝算子", notes = "拷贝算子")
    public ApiResult<Long> copy(HttpServletRequest request,
        @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) TaskVO vo) {
        if (vo.getId() == null || vo.getId() < 0L) {
            logger.warn("API /task/copy failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        List<ApiResultCode> errorCode = new ArrayList<>();
        Long id = taskService.copy(vo, errorCode);
        if (id == null || errorCode.size() != 0) {
            return ApiResult.valueOf(errorCode.get(0));
        }
        if (request != null) {
            TaskDTO taskDTO = taskService.queryById(id);
            List<TaskDTO> ctx1 = new ArrayList<>();
            ctx1.add(taskDTO);
            actionService.addActionForTask(ActionEnum.ADD, ctx1, null);
        }
        return ApiResult.valueOf(id);
    }

    @PostMapping(value = "/queryById")
    @ResponseBody
    @ApiOperation(value = "查询算子", notes = "根据id查询单个算子")
    public ApiResult<TaskVO> queryById(HttpServletRequest request, @RequestBody TaskVO vo) {
        if (vo.getUserId() == null) {
            vo.setUserId(JwtUtil.getCurrentUserId());
        }
        TaskDTO task = taskService.queryById(vo.getId());
        if (null != task) {
            TaskVO ret = task.view();
            return ApiResult.valueOf(ret);
        }else {
            return ApiResult.valueOf(null);
        }
    }

    @PostMapping(value = "/queryByPipeline")
    @ResponseBody
    @ApiOperation(value = "批量查询算子", notes = "根据pipeline批量查询算子")
    public ApiResult<List<TaskVO>> queryByPipeline(HttpServletRequest request,
        @RequestBody @ProjectAuth(auth = ProjectAuthEnum.READ, field = "id") TaskVO vo) {
        //前端这里传过来的id 其实是projectId 并不是taskId 应该改掉
        if (vo.getUserId() == null) {
            vo.setUserId(JwtUtil.getCurrentUserId());
        }

        if (vo.getPipelineId() <= 0L) {
            logger.warn("API /task/queryByPipeline failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        List<TaskDTO> tasks = taskService.queryByPipeline(vo.getPipelineId());
        List<TaskVO> ret = Lists.newArrayList();
        for (TaskDTO task : tasks) {
            TaskVO view = task.view();
            //前端展示清洗节点的操作用
            if (TaskTypeEnum.TASK_TYPE_CLEAN.getVal().equals(view.getType())) {
                List<ColumnAction> columnActions = tColumnService.queryAction(view.getId());
                view.getData().put("cleanActions", columnActions);
            }
            ret.add(view);
        }
        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/preview")
    @ResponseBody
    @ApiOperation(value = "算子数据预览", notes = "算子数据预览")
    public ApiResult<List<DataPreviewVO>> preview(HttpServletRequest request, @RequestBody TaskVO vo) {
        if (vo.getId() == null || vo.getId() <= 0L || vo.getPipelineId() == null
            || vo.getPipelineId() <= 0L) {
            logger.warn("API /task/preview failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        Long taskId = vo.getId();
        Long pipelineId = vo.getPipelineId();
        List<DataPreviewVO> previewVOS = dataPreviewService.preview(taskId, pipelineId);
        return ApiResult.valueOf(previewVOS);
    }

    @PostMapping(value = "/batchCopy")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "批量拷贝", notes = "批量拷贝")
    public ApiResult<Void> batchCopy(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() @Valid TPIdComboVO vo) {
        //如果指定了pipelineId 就不会重新分配节点位置，以及重命名 （用于跨项目，跨pipeline复制）
        List<TaskDTO> targetTasks = taskService.queryByIds(vo.getTaskIds());

        if (vo.getIsCross()){ //用于跨项目，跨pipeline复制时， isCross为true TODO 前端还没做
            targetTasks.forEach(e -> e.setPipelineId(vo.getPipelineId()));
        }
        if (!taskService.batchCopy(targetTasks, vo.getIsCross() ? vo.getPipelineId() : null)) {
            logger.error("API /task/batchCopy failed, since {}", "批量更新失败");
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
        }

        //redo undo 记录入栈
        if (request != null) {
            List<TaskDTO> newTasks = Lists.newArrayList(targetTasks);
            actionService.addActionForTask(ActionEnum.ADD, newTasks, null);
        }

        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/batchDelete")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "批量删除", notes = "批量删除")
    public ApiResult<Void> batchDelete(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() @Valid TPIdComboVO vo) {

        //保存一份被删除的节点 用于redo undo功能
        List<TaskDTO> hasBeenDeletedTasks = Lists.newArrayList();
        JSONObject context = new JSONObject();

        taskService.batchDelete(context, vo.getTaskIds(), hasBeenDeletedTasks, vo.getPipelineId());
        if (request != null) {
            actionService.addActionForTask(ActionEnum.DELETE, hasBeenDeletedTasks, null, null, context);
        }

        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/batchForbidden")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "批量禁止", notes = "批量禁止")
    public ApiResult<Void> batchForbidden(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @PipelineAuth() @RequestBody @Valid TPIdComboVO param) {

        List<Long> taskIds = param.getTaskIds();
        List<TaskDTO> ctx1 = Lists.newArrayList();
        List<TaskDTO> ctx2 = Lists.newArrayList();
        for (Long taskId : taskIds) {
            if (request != null) {
                ctx1.add(taskService.queryById(taskId));
            }

            TaskVO taskVO = new TaskVO(taskId, param.getPipelineId()).initForbiddenInfo();
            ApiResult<Long> res = this.simpleUpdate(null, taskVO);
            if (res.getResult() == null || res.getCode() != ApiResultCode.SUCCESS.getCode()) {
                taskService.clearTaskUtil(param.getPipelineId());
                logger.error("API /task/simpleUpdate failed during API /task/batchForbidden execution, since {}", res.getMessage());
                return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
            }

            if (request != null) {
                ctx2.add(taskService.queryById(taskId));
            }
        }

        taskService.clearTaskUtil(param.getPipelineId());
        if (request != null) {
            actionService.addActionForTask(ActionEnum.UPDATE, ctx1, ctx2);
        }
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/batchUnForbidden")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "批量启用", notes = "批量启用")
    public ApiResult<Void> batchUnForbidden(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() @Valid TPIdComboVO param) {

        List<Long> taskIds = param.getTaskIds();
        List<TaskDTO> ctx1 = Lists.newArrayList();
        List<TaskDTO> ctx2 = Lists.newArrayList();
        for (Long taskId : taskIds) {
            if (request != null) {
                ctx1.add(taskService.queryById(taskId));
            }

            TaskVO taskVO = new TaskVO(taskId, param.getPipelineId()).initUnForbiddenInfo();
            ApiResult<Long> res = this.simpleUpdate(null, taskVO);
            if (res.getResult() == null || res.getCode() != ApiResultCode.SUCCESS.getCode()) {
                taskService.clearTaskUtil(param.getPipelineId());
                logger.error("API /task/simpleUpdate failed during API /task/batchForbidden execution, since {}", res.getMessage());
                return ApiResult.valueOf(ApiResultCode.SYS_ERROR);
            }

            if (request != null) {
                ctx2.add(taskService.queryById(taskId));
            }
        }

        taskService.clearTaskUtil(param.getPipelineId());
        if (request != null) {
            actionService.addActionForTask(ActionEnum.UPDATE, ctx1, ctx2);
        }
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/deleteById")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "删除算子", notes = "根据id删除单个算子")
    public ApiResult<Void> deleteById(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) TaskVO vo) {
        if (vo.getUserId() == null) {
            vo.setUserId(JwtUtil.getCurrentUserId());
        }
        List<WidgetDTO> widgets = widgetService.queryByTaskIdAndType(vo.getId());
        if (CollectionUtils.isNotEmpty(widgets)) {
            logger.error("API /task/deleteById failed, since {}", "该节点有可视化图表或画板上有图表依赖该节点，请先删除图表!");
            return ApiResult
                .valueOf(ApiResultCode.SYS_ERROR, null, "该节点有可视化图表或画板上有图表依赖该节点，请先删除图表!");
        }
        TaskDTO taskNode = null;
        if (request != null) {
            taskNode = taskService.queryById(vo.getId());
        }
        taskService.initTaskUtil(vo.getPipelineId());
        boolean flag = taskService.deleteTaskNode(vo.getId());
        if (!flag) {
            taskService.clearTaskUtil(vo.getPipelineId());
            logger.error("API /task/deleteById failed, since {}", "删除节点错误");
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "删除节点错误!!!");
        }
        if (request != null) {
            List<TaskDTO> array = new ArrayList<>();
            array.add(taskNode);
            actionService.addActionForTask(ActionEnum.DELETE, array, null);
        }

        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/queryFullInputById")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "查询完整的input表信息", notes = "查询完整的input表信息")
    public ApiResult<TaskVO> queryFullInputById(HttpServletRequest request,
        @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) TaskVO vo) {
        if (vo.getId() == null || vo.getId() <= 0L) {
            logger.warn("API /task/queryFullInputById failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        TaskVO taskVO = taskService.queryFullInputById(vo.getId());
        return ApiResult.valueOf(taskVO);
    }

    @PostMapping(value = "/queryInputTables")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "查询完整的input表", notes = "查询完整的input表")
    public ApiResult<JSONArray> queryInputTables(HttpServletRequest request,
        @RequestBody TaskVO vo) {
        if (vo.getId() == null || vo.getId() <= 0L) {
            logger.warn("API /task/queryInputTables failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        TaskVO taskVO = taskService.queryFullInputById(vo.getId());
        JSONObject data = taskVO.getData();
        JSONArray input = data.getJSONArray("input");
        JSONArray tables = new JSONArray();
        //[
        // datasource1: tablename1,
        // datasource2: tablename2,
        //  ....
        // ]
        for (int i = 0; i < input.size(); ++i) {
            tables.add(
                "$" + input.getJSONObject(i).getString("nodeName") + "$" + input.getJSONObject(i)
                    .getString("tableName"));
        }
        return ApiResult.valueOf(tables);
    }


    @PostMapping(value = "/favourite")
    @ApiOperation(value = "收藏算子或取消收藏")
    public ApiResult<Boolean> favourite(@Valid @RequestBody OperatorFavouriteVO vo) {
        OperatorFavouriteDTO dto = DozerUtil.mapper(vo, OperatorFavouriteDTO.class);
        operatorFavouriteService.favourite(dto);
        return ApiResult.valueOf(true);
    }

    @RequestMapping("/download")
    @ApiOperation(value = "下载中间结果", notes = "下载中间结果")
    public void download(HttpServletRequest request, HttpServletResponse response,
        @RequestParam("taskId") Long taskId) {
        taskService.downloadTable(response, taskId);
    }

    @PostMapping(value = "/deleteLine")
    @ResponseBody
    @Transactional
    @ApiOperation(value = "删除连线", notes = "删除连线")
    public ApiResult<Void> deleteLine(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() TaskVO vo) {
        if (StringUtils.isEmpty(vo.getParentId()) || StringUtils.isEmpty(vo.getChildId())
            || vo.getPipelineId() == null) {
            logger.warn("API /task/deleteLine failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        List<TaskDTO> ctx1 = new ArrayList<>();
        List<TaskDTO> ctx2 = new ArrayList<>();
        JSONObject context = new JSONObject();
        if (request != null) {
            taskService
                .findDescendantsOrAncestors(Long.parseLong(vo.getParentId()), ctx1, null, true);
        }
        taskService.initTaskUtil(vo.getPipelineId());
        boolean flag = taskService
            .deleteLine(Long.parseLong(vo.getParentId()), Long.parseLong(vo.getChildId()));
        if (!flag) {
            taskService.clearTaskUtil(vo.getPipelineId());
            logger.error("API /task/deleteLine failed, since {}", "删除连线错误");
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "删除连线错误");
        }
        if (request != null) {
            ctx2.add(taskService.queryById(Long.parseLong(vo.getParentId())));
            taskService
                .findDescendantsOrAncestors(Long.parseLong(vo.getChildId()), ctx2, null, true);
            actionService.addActionForTask(ActionEnum.UPDATE, ctx1, ctx2, null, context);
        }
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/connectLine")
    @Transactional
    @ApiOperation(value = "拖拽连线", notes = "拖拽连线")
    public ApiResult<Long> connectLine(HttpServletRequest request,
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() TaskVO vo) {
        if (vo.getPipelineId() == null || vo.getPipelineId() <= 0L) {
            logger.warn("API /task/connectLine failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR, null, "无pipelineId");
        }
        if (StringUtils.isEmpty(vo.getParentId()) || StringUtils.isEmpty(vo.getChildId())) {
            logger.warn("API /task/connectLine failed, since {}", "childrenId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR, null, "没有childrenId");
        }
        if (vo.getProjectId() <= 0L) {
            logger.warn("API /task/connectLine failed, since {}", "projectId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        Long referParentId = Long.parseLong(vo.getParentId());
        Long referChildId = Long.parseLong(vo.getChildId());
        TaskDTO parentNode = taskService.queryById(referParentId);
        TaskDTO childNode = taskService.queryById(referChildId);
        if (TaskTypeEnum.TASK_TYPE_GRAPH.getVal() == parentNode.getType()) {
            logger.error("API /task/connectLine failed, since {}",
                ApiResultCode.GRAPH_NO_CHILD.getMessage());
            return ApiResult.valueOf(ApiResultCode.GRAPH_NO_CHILD);
        }
        if (taskService.isBrotherNode(parentNode, childNode)) {
            logger.error("API /task/connectLine failed, since {}",
                ApiResultCode.FORBIDDEN_CONNECT.getMessage());
            return ApiResult.valueOf(ApiResultCode.FORBIDDEN_CONNECT);
        }
        List<TaskDTO> ctx1 = new ArrayList<>();
        List<TaskDTO> ctx2 = new ArrayList<>();
        JSONObject context = new JSONObject();
        String returnMsg = "";
        if (request != null) {
            ctx1.add(parentNode);
            taskService.findDescendantsOrAncestors(referChildId, ctx1, null, true);
        }
        String childIdParent = taskService
            .appendIds(parentNode.getChildId(), referChildId.toString());
        parentNode.setChildId(childIdParent);

        String parentIdChild = taskService
            .appendIds(childNode.getParentId(), referParentId.toString());
        childNode.setParentId(parentIdChild);
        TaskDTO tmpDto = new TaskDTO();
        tmpDto.setId(parentNode.getId());
        tmpDto.setChildId(childIdParent);
        tmpDto.setPipelineId(vo.getPipelineId());
        tmpDto.setProjectId(vo.getProjectId());
        tmpDto.setType(parentNode.getType());

        taskService.update(tmpDto);

        tmpDto.setId(childNode.getId());
        tmpDto.setType(childNode.getType());
        tmpDto.setParentId(parentIdChild);
        tmpDto.setChildId(childNode.getChildId());
        JSONObject data = new JSONObject();
        data.put("algType", childNode.view().getData().getString("algType"));
        data.put("isSimple", false);
        data.put("connected", true);
        tmpDto.setDataJson(JSONObject.toJSONString(data));
        //TODO AY
        if (childNode.getType() == TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()){
            tmpDto = taskService.dataJsonUpdate(parentNode, childNode);
            if (JSONObject.parseObject(tmpDto.getDataJson()).containsKey("warning")){
                if (JSONObject.parseObject(tmpDto.getDataJson()).getString("warning")
                        .equals(ApiResultCode.SMOOTH_DATA_PRE_REQ.getMessage())){
                    //return ApiResult.valueOf(ApiResultCode.SMOOTH_DATA_PRE_REQ);
                }
            }
        }else if (childNode.getType() == TaskTypeEnum.TASK_TYPE_JLAB.getVal()){

            Long userId = JwtUtil.getCurrentUserId();
            jlabService.execApp(userId, 0L, ExeFlagEnum.LOAD_DATA.getVal(), parentNode, childNode);
        }

        ApiResult<Long> updateResult = this.simpleUpdate(null, tmpDto.view());

        if (updateResult.getResult() == null || updateResult.getCode() != ApiResultCode.SUCCESS
            .getCode()) {
            logger.warn("API /task/connect line failed, since {}", updateResult.getMessage());
            taskService.clearTaskUtil(vo.getPipelineId());
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, updateResult.getMessage());
        }

        taskService.updateTaskNode(vo.getPipelineId(), childNode, "update");
        taskService.updateTaskNode(vo.getPipelineId(), parentNode, "update");
        taskService.updateTaskName(parentNode, "update");
        taskService.updateTaskName(childNode, "update");
        boolean flag = taskService
            .batchUpdatePositionForConnectLine(vo.getPipelineId(), referParentId, referChildId);
        if (!flag) {
            taskService.clearTaskUtil(vo.getPipelineId());
            logger.warn("API /task/connect line failed, since {}", "批量更新位置失败");
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "批量更新位置失败");
        }

        if (childNode.getType().equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal())) {
            if (taskService.isSimilarSourceTable(referParentId, referChildId)) {
                //如果跟原来数据集的列名和数据类型相同，则不删除历史记录, 但需要更新第一条action
                tColumnService.updateOriginalDataAction(referParentId, referChildId);

            } else {
                //否则，先将原先的历史记录清除
                taskService.deleteLineInitContext(context, childNode.getId());
                taskService.dropGenerateTables(childNode);
                //并在 清洗节点连线后添加一条原始数据记录
                TaskInstanceDTO cleanAction = tColumnService
                    .addOriginalDataAction(taskService.queryById(referChildId));
                List<TaskInstanceDTO> deleteActions = Lists.newArrayList();
                deleteActions.add(cleanAction);
                JSONObject json = new JSONObject();
                json.put(String.valueOf(childNode.getId()), deleteActions);
                context.put("deleteActions", json);
                returnMsg = "替换数据与原有数据格式不一致, 已重新构建清洗节点";
            }
        }

        if (request != null) {
            taskService.findDescendantsOrAncestors(referParentId, ctx2, null, true);
            actionService.addActionForTask(ActionEnum.UPDATE, ctx1, ctx2, null, context);
        }
        taskService.setAutoJoinFlag(childNode.getId());
        return ApiResult.valueOf(ApiResultCode.SUCCESS, null, returnMsg);
    }

    @PostMapping(value = "/addUnionJoinNode")
    @Transactional
    @ApiOperation(value = "拖拽新增union或者join节点", notes = "拖拽新增union或者join节点")
    public ApiResult<TaskVO> addUnionJoinNode(
        @RequestBody JSONObject params) {
        if (params == null || !params.containsKey("params")) {
            logger.warn("API /task/addUnionJoinNode failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        JSONArray paramArray = params.getJSONArray("params");
        List<TaskVO> vos = new ArrayList<>();
        for (int i = 0; i < paramArray.size(); ++i) {
            JSONObject obj = paramArray.getJSONObject(i);
            TaskVO taskVO = JSONObject.parseObject(JSONObject.toJSONString(obj), TaskVO.class);
            if (taskVO != null) {
                vos.add(taskVO);
            }
        }
        List<TaskDTO> ctx1 = new ArrayList<>();
        List<TaskDTO> ctx2 = new ArrayList<>();
        List<TaskDTO> ctx3 = new ArrayList<>();
        List<Long> existIds = new ArrayList<>();
        List<Long> addIds = new ArrayList<>();
        TaskVO newVo = null;
        if (vos.size() == 1) {
            // 两个已有节点进行union或者join操作
            TaskVO vo = vos.get(0);
            String parentId = vo.getParentId();
            if (StringUtils.isEmpty(parentId) || parentId.split(",").length != 2) {
                logger.warn("API /task/addUnionJoinNode failed, since {}", "parentId is empty");
                return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
            }
            ApiResult<TaskVO> loadResult = load(null, vo);
            if (loadResult.getCode() != ApiResultCode.SUCCESS.getCode()) {
                return loadResult;
            }
            newVo = loadResult.getResult();
            addIds.add(newVo.getId());
            String[] parentIds = parentId.split(",");
            // 修改已有节点的子节点
            for (String id : parentIds) {
                TaskDTO dto = taskService.queryById(Long.parseLong(id));
                ctx1.add(SerializationUtils.clone(dto));
                existIds.add(dto.getId());
                dto.setId(dto.getId());
                dto.setChildId(taskService.appendIds(dto.getChildId(), newVo.getId().toString()));
                taskService.update(dto);
                taskService.updateTaskName(dto, "update");
                taskService.updateTaskNode(dto.getPipelineId(), dto, "update");
            }
            TaskDTO newDto = newVo.toTask();
            boolean flag = taskService.updatePositionForJoinOrUnion(vo.getPipelineId(), newDto);
            newVo = newDto.view();
            if (!flag) {
                taskService.clearTaskUtil(newVo.getPipelineId());
                logger.error("API /task/addUnionJoinNode failed, since {}", "更新失败");
                return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "更新位置失败");
            }

        } else if (vos.size() == 2) {
            // 拖拽新节点和已有节点进行union或者join操作
            TaskVO dragNodeVo = vos.get(0);
            // join 或者union节点，新增
            TaskVO vo = vos.get(1);
            // 已经存在的父节点
            String parentId = vo.getParentId();
            TaskDTO existParentNode = taskService.queryById(Long.parseLong(parentId));
            ctx1.add(SerializationUtils.clone(existParentNode));
            existIds.add(Long.parseLong(parentId));

            ApiResult<TaskVO> loadResult = load(null, dragNodeVo);
            if (loadResult.getCode() != ApiResultCode.SUCCESS.getCode()) {
                return loadResult;
            }
            ApiResult<TaskVO> loadResult2 = this.load(null, vo);
            if (loadResult2.getCode() != ApiResultCode.SUCCESS.getCode()) {
                return loadResult2;
            }
            newVo = loadResult2.getResult();
            TaskVO dragNewVo = loadResult.getResult();
            addIds.add(newVo.getId());
            addIds.add(dragNewVo.getId());
            TaskDTO dragDto = dragNewVo.toTask();
            dragDto.setChildId(newVo.getId().toString());
            boolean flag = taskService
                .updatePositionDragAddForJoinOrUnion(vo.getPipelineId(), Long.parseLong(parentId),
                    dragDto);
            if (!flag) {
                taskService.clearTaskUtil(vo.getPipelineId());
                logger.error("API /task/addUnionJoinNode failed, since {}", "更新位置失败");
                return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "更新位置失败");
            }

            TaskDTO newDto = newVo.toTask();
            newDto.setParentId(
                taskService.appendIds(newDto.getParentId(), dragDto.getId().toString()));

            flag = taskService.updatePositionForJoinOrUnion(vo.getPipelineId(), newDto);
            newVo = newDto.view();
            if (!flag) {
                taskService.clearTaskUtil(newVo.getPipelineId());
                logger.error("API /task/addUnionJoinNode failed, since {}", "更新位置失败");
                return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "更新位置失败");
            }

            // 更新已有节点的childId
            existParentNode.setChildId(
                taskService.appendIds(existParentNode.getChildId(), newVo.getId().toString()));
            if (!taskService.update(existParentNode)) {
                taskService.clearTaskUtil(newVo.getPipelineId());
                logger.error("API /task/addUnionJoinNode failed, since {}", "更新已有节点childId失败");
                return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "更新已有节点childId失败!!");
            }
            taskService.updateTaskNode(existParentNode.getPipelineId(), existParentNode, "update");
        } else {
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        // 强制刷新，更新配置文件
        TaskDTO tmpDto = new TaskDTO();
        tmpDto.setId(newVo.getId());
        tmpDto.setPipelineId(newVo.getPipelineId());
        tmpDto.setProjectId(newVo.getProjectId());
        tmpDto.setParentId(newVo.getParentId());
        tmpDto.setType(newVo.getType());
        JSONObject data = new JSONObject();
        data.put("algType", newVo.getData().getString("algType"));
        tmpDto.setDataJson(JSONObject.toJSONString(data));
        TaskVO newTaskVO = tmpDto.view();
        ApiResult<Long> updateResult = this.update(null, newTaskVO);
        if (updateResult.getCode() != ApiResultCode.SUCCESS.getCode()) {
            taskService.clearTaskUtil(newVo.getPipelineId());
            logger.error("API /task/addUnionJoinNode failed, since {}", "强制更新失败");
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "强制更新失败");
        }
        // autoJoin
        taskService.setAutoJoinFlag(newVo.getId());

        for (Long id : existIds) {
            ctx2.add(taskService.queryById(id));
        }
        for (Long id : addIds) {
            ctx3.add(taskService.queryById(id));
        }
        actionService.addActionForTask(ActionEnum.ADD_UPDATE, ctx1, ctx2, ctx3);
        return ApiResult.valueOf(newVo);
    }

    @PostMapping(value = "/complexLoad")
    @Transactional
    @ApiOperation(value = "加号添加算子", notes = "点击加号添加算子")
    public ApiResult<TaskVO> complexLoad(
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.DEVELOPER, checkLock = true)
        @RequestBody @PipelineAuth() TaskVO vo) {
        String parentId = vo.getParentId();
        String childId = vo.getChildId();
        ApiResult<TaskVO> loadResult = load(null, vo);
        if (loadResult.getResult().getData().containsKey("warning")){
            if (loadResult.getResult().getData().getString("warning").equals(ApiResultCode.SMOOTH_DATA_PRE_REQ)){
                return ApiResult.valueOf(ApiResultCode.SMOOTH_DATA_PRE_REQ);
            }
        }
        if ((StringUtils.isEmpty(parentId) && StringUtils.isEmpty(childId))
            || loadResult.getCode() != ApiResultCode.SUCCESS.getCode()) {
            //加号添加 -> 不走这里
            if (StringUtils.isEmpty(parentId) && StringUtils.isEmpty(childId)
                && loadResult.getCode() == ApiResultCode.SUCCESS.getCode()) {
                List<TaskDTO> array = new ArrayList<>();
                array.add(taskService.queryById(loadResult.getResult().getId()));
                actionService.addActionForTask(ActionEnum.ADD, array, null);
            }
            // 拖拽添加, 不关联任何节点
            return loadResult;
        }
        List<TaskDTO> ctx1 = new ArrayList<>();   //记录操作前现场
        List<TaskDTO> ctx2 = new ArrayList<>();   //记录操作后现场
        List<TaskDTO> ctx3 = new ArrayList<>();
        // 保留更新前现场
        if (StringUtils.isNotEmpty(parentId)) {
            taskService.findDescendantsOrAncestors(Long.parseLong(parentId), ctx1, null, true);
        } else if (StringUtils.isNotEmpty(childId)) {
            taskService.findDescendantsOrAncestors(Long.parseLong(childId), ctx1, null, true);
        }
        TaskVO newVo = loadResult.getResult();
        TaskDTO childTaskDTO = null;
        if (StringUtils.isNotEmpty(parentId)) {
            // 父节点不为空,父亲节点链接建立
            TaskDTO parentNode = taskService.queryById(Long.parseLong(parentId));
            if (JSONObject.parseObject(parentNode.getDataJson()).getJSONArray("output").size() ==0){
                taskService.delete(newVo.getId());
                logger.error("API /task/complexLoad failed, since {}", "parent's output is empty");
                return ApiResult.valueOf(ApiResultCode.PARENT_TABLE_EMPTY);
            }
            if (dagScheduler.isStillRunning(parentNode.getId())){
                taskService.delete(newVo.getId());
                logger.error("API /task/complexLoad failed, since {}", "parent is still running, please wait.");
                return ApiResult.valueOf(ApiResultCode.JOB_HAVE_NOT_DONE_YET);
            }
            TaskDTO tmpDto = new TaskDTO();
            tmpDto.setId(parentNode.getId());
            String childIdParent = parentNode.getChildId();
            if (StringUtils.isNotEmpty(childId)) {
                // 断开连接
                childIdParent = taskService.removeIds(childIdParent, childId);
            }
            // 建立链接
            childIdParent = taskService.appendIds(childIdParent, newVo.getId().toString());
            tmpDto.setChildId(childIdParent);
            taskService.update(tmpDto);
            TaskDTO fullDto = taskService.queryById(tmpDto.getId());
            taskService.updateTaskName(fullDto, "update");
            taskService.updateTaskNode(fullDto.getPipelineId(), fullDto, "update");
        }
        if (StringUtils.isNotEmpty(childId)) {
            // 子节点不为空，子节点链接建立
            TaskDTO childNode = taskService.queryById(Long.parseLong(childId));
            TaskDTO tmpDto = new TaskDTO();
            tmpDto.setId(childNode.getId());
            String parentIdChild = childNode.getParentId();
            if (StringUtils.isNotEmpty(parentId)) {
                // 断开连接
                parentIdChild = taskService.removeIds(parentIdChild, parentId);
            }
            parentIdChild = taskService.appendIds(parentIdChild, newVo.getId().toString());
            tmpDto.setParentId(parentIdChild);
            taskService.update(tmpDto);
            TaskDTO fullDto = taskService.queryById(tmpDto.getId());
            taskService.updateTaskName(fullDto, "update");
            taskService.updateTaskNode(fullDto.getPipelineId(), fullDto, "update");
            childTaskDTO = fullDto;
        }
        // 强制刷新，更新配置文件

        TaskDTO tmpDto = new TaskDTO();
        tmpDto.setId(newVo.getId());
        tmpDto.setPipelineId(newVo.getPipelineId());
        tmpDto.setProjectId(newVo.getProjectId());
        tmpDto.setType(newVo.getType());
        tmpDto.setParentId(parentId);
        tmpDto.setChildId(childId);
        JSONObject data = new JSONObject();
        //TODO REMOVE
        if (newVo.getType().equals(TaskTypeEnum.TASK_TYPE_ALGO.getVal())
            && AlgEnum.PCA_DENSE.getVal() == newVo.getData().getInteger("algType")) {
            // pca特殊处理
            taskService.addPcaAlgorithmHandler(newVo, childId);
            data.put("setParams", newVo.getData().getJSONArray("setParams"));
        }
        data.put("algType", newVo.getData().getString("algType"));
        data.put("isSimple", false);
        tmpDto.setDataJson(JSONObject.toJSONString(data));
        TaskVO newTaskVO = tmpDto.view();
        boolean flag = taskService
            .batchUpdatePosition(newVo.getPipelineId(), tmpDto, parentId, childId);
        if (!flag) {
            taskService.clearTaskUtil(newVo.getPipelineId());
            logger.error("API /task/complexLoad failed, since {}", "批量更新位置失败");
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, "批量更新位置失败");
        }
        ApiResult<Long> updateResult = this.simpleUpdate(null, newTaskVO);
        if (updateResult.getResult() == null ||
                updateResult.getCode() != ApiResultCode.SUCCESS.getCode()) {
            taskService.clearTaskUtil(newTaskVO.getPipelineId());
            logger.error("API /task/complexLoad failed, since {}", updateResult.getMessage());
            if(updateResult.getCode() == ApiResultCode.AUTO_TRIGGER_FAILED.getCode()){
                return ApiResult.valueOf(ApiResultCode.AUTO_TRIGGER_FAILED, null, updateResult.getMessage());
            }else {
                return ApiResult.error(ApiResultCode.SYS_ERROR, updateResult.getMessage());
            }
        }
        TaskVO tmpNewVo = taskService.queryById(newTaskVO.getId()).view();
        if (newVo.getType().equals(TaskTypeEnum.TASK_TYPE_CLEAN.getVal())) {
            if (tmpNewVo.getData().getJSONArray("input").size() > 0) {
                //清洗节点连线后添加一条原始数据记录Action
                tColumnService.addOriginalDataAction(tmpNewVo.toTask());
            } else {
                logger.error("API /task/complexLoad failed, since {}", "node's input is empty");
                return ApiResult.valueOf(ApiResultCode.TABLE_EMPTY);
            }
        }
        JSONObject context = new JSONObject();
        //如果当前添加的节点的子节点是清洗。将子节点历史记录删除，再添加一条原始数据Action
        if (null != childTaskDTO && TaskTypeEnum.TASK_TYPE_CLEAN.getVal()
            .equals(childTaskDTO.getType())
            && !taskService.isSameSourceTable(newVo.getId(), childTaskDTO.getId())) {
            taskService.deleteLineInitContext(context, childTaskDTO.getId());
            taskService.dropGenerateTables(childTaskDTO);
            JSONObject childTaskData = childTaskDTO.view().getData();
            childTaskData.put("input", tmpNewVo.getData().getJSONArray("output"));
            childTaskData.put("output", tmpNewVo.getData().getJSONArray("output"));
            childTaskDTO.setDataJson(childTaskData.toJSONString());
            taskService.update(childTaskDTO);
            tColumnService.addOriginalDataAction(childTaskDTO);
        }

        // 记录操作后现场
        Set<Long> excludeIds = new HashSet<>();
        excludeIds.add(newVo.getId());
        if (StringUtils.isNotEmpty(parentId)) {
            taskService
                .findDescendantsOrAncestors(Long.parseLong(parentId), ctx2, excludeIds, true);
        } else if (StringUtils.isNotEmpty(childId)) {
            taskService.findDescendantsOrAncestors(Long.parseLong(childId), ctx2, excludeIds, true);
        }
        ctx3.add(taskService.queryById(newVo.getId()));
        actionService.addActionForTask(ActionEnum.ADD_UPDATE, ctx1, ctx2, ctx3, context);

        if (null != childTaskDTO && TaskTypeEnum.TASK_TYPE_ALGOPY.getVal()
                .equals(childTaskDTO.getType())){
            taskService.dataJsonUpdate(tmpNewVo.toTask(), childTaskDTO);
        }

        if (null != childTaskDTO && TaskTypeEnum.TASK_TYPE_JLAB.getVal()
                .equals(childTaskDTO.getType()) ){
            Long userId = JwtUtil.getCurrentUserId();
            jlabService.execApp(userId, 0L, ExeFlagEnum.LOAD_DATA.getVal(), tmpNewVo.toTask(), childTaskDTO);
        }
        if (parentId != null && null == childTaskDTO && vo.getType() == TaskTypeEnum.TASK_TYPE_JLAB.getVal()){
            TaskDTO parenTask = taskService.queryById(Long.valueOf(parentId));
            Long userId = JwtUtil.getCurrentUserId();
            jlabService.execApp(userId, 0L, ExeFlagEnum.LOAD_DATA.getVal(), parenTask, tmpNewVo.toTask());
        }

        return ApiResult.valueOf(tmpNewVo);
    }

    @PostMapping(value = "/queryJoinResult")
    @ResponseBody
    @ApiOperation(value = "查询Join结果", notes = "查询Join结果")
    public ApiResult<JSONObject> queryJoinResult(HttpServletRequest request,
        @RequestBody @ProjectAuth(auth = ProjectAuthEnum.READ) TaskVO vo) {
        JSONObject ret = taskService.queryJoinResult(vo);
        if (vo.getException() != null) {
            logger
                .warn("API /task/queryJoinResult failed, since {}", vo.getException().getMessage());
            return ApiResult.valueOf(ApiResultCode.SYS_ERROR, null, vo.getException().getMessage());
        }
        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/joinRecommend")
    @ResponseBody
    @ApiOperation(value = "查询Join结果", notes = "查询Join结果")
    public ApiResult<JSONArray> joinRecommend(HttpServletRequest request,
        @RequestBody JSONObject param) {
        JSONArray ret = autoJoinService.joinRecommend(param);
        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/getFasttext")
    @ResponseBody
    @ApiOperation(value = "查询Join结果", notes = "查询Join结果")
    public ApiResult<Double> getFasttext(HttpServletRequest request,
        @RequestBody @ProjectAuth(auth = ProjectAuthEnum.READ) TaskVO vo) {
        Long taskId = vo.getId();
        TaskDTO task = taskService.queryById(taskId);
        JSONObject conf = JSONObject.parseObject(task.getDataJson());
        String str1 = conf.getString("str1");
        String str2 = conf.getString("str2");
        double ret = fastTextService.getFasttext(fastTextService.getAutojoinModel(), str1, str2);
        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/saveToDataset")
    @ResponseBody
    @ApiOperation(value = "保存到数据管理", notes = "保存到数据管理")
    @ApiGroup(groups = TaskSaveVO.Dataset.class)
    public ApiResult<Boolean> saveToDataset(HttpServletRequest req, @RequestBody TaskSaveVO vo) {
        Boolean ret = false;
        if ((vo.getCategoryId() == null && StringUtils.isBlank(vo.getCategoryName())) ||
            (StringUtils.isBlank(vo.getDatasetName()))) {
            logger.error("API /task/saveToDataSource failed, since {}",
                BaseErrorCode.TASK_SAVE_TO_DATASET_PARAM.getMsg());
            throw new DataScienceException(BaseErrorCode.TASK_SAVE_TO_DATASET_PARAM);
        }
        ret = taskService.saveToDataset(vo);
        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/locateDataSource")
    @ResponseBody
    @ApiOperation(value = "定位功能", notes = "根据dataset定位")
    public ApiResult<Set<Long>> getTaskByDataset(
        @ProjectRoleAuth(role = ProjectRoleAuthEnum.VISITOR)
        @RequestBody DatasourceLocateVO vo) {
        Set<Long> taskIds = taskService.locateDataSource(vo.getDatasetId(), vo.getPipelineId());
        return ApiResult.valueOf(taskIds);
    }

    @PostMapping(value = "/saveToDataSource")
    @ResponseBody
    @ApiOperation(value = "保存到数据源", notes = "保存到数据源")
    @ApiGroup(groups = TaskSaveVO.Dataset.class)
    public ApiResult<Boolean> saveToDataSource(HttpServletRequest req, @RequestBody TaskSaveVO vo) {
        Boolean ret = false;
        if ((vo.getCategoryId() == null && StringUtils.isBlank(vo.getCategoryName())) ||
            (StringUtils.isBlank(vo.getDatasetName()))) {
            logger.error("API /task/saveToDataSource failed, since {}",
                BaseErrorCode.TASK_SAVE_TO_DATASET_PARAM.getMsg());
            throw new DataScienceException(BaseErrorCode.TASK_SAVE_TO_DATASET_PARAM);
        }
        ret = taskService.saveToDataSource(vo);
        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/queryDataVis")
    @ResponseBody
    @ApiOperation(value = "查询可视化", notes = "查询数据可视化")
    public ApiResult<JSONObject> queryDataVis(HttpServletRequest request,
                                                 @RequestBody @ProjectAuth(auth = ProjectAuthEnum.READ) TaskVO vo) {
        JSONObject ret = taskService.queryDataVis(vo);
        return ApiResult.valueOf(ret);
    }


    @PostMapping(value = "/queryJlabURL")
    @ResponseBody
    @ApiOperation(value = "查询Jlab的url", notes = "查询Jlab的url链接")
    public ApiResult<String> queryJlabURL(HttpServletRequest request,
                                          @RequestBody @ProjectAuth(auth = ProjectAuthEnum.READ) TaskVO vo) {
        vo.setUserId(JwtUtil.getCurrentUserId());
        return ApiResult.valueOf(jlabService.queryURL(vo));
    }

    @PostMapping(value = "/loadJlabls")
    @ResponseBody
    @ApiOperation(value = "加载formData下拉框", notes = "加载formData下拉框")
    public ApiResult<String> loadlJlabls(HttpServletRequest request,
                                         @RequestBody @ProjectAuth(auth = ProjectAuthEnum.READ) TaskVO vo) {
        return ApiResult.valueOf(jlabService.loadJlabls(vo));
    }
}